RocketMQ Connect実践編 4
SFTPサーバー(ファイルデータ)-> RocketMQ Connect -> SFTPサーバー(ファイル)
準備
RocketMQの起動
- Linux/Unix/Mac
- 64ビットJDK 1.8以上;
- Maven 3.2.x以上;
- RocketMQを起動します。 RocketMQ 4.x または RocketMQ 5.x のいずれかのバージョンを使用できます。
- ツールを使用してRocketMQのメッセージ送受信テストを行います。
ここでは、環境変数NAMESRV_ADDRを使用して、ツールクライアントにRocketMQの名前サーバーアドレス(localhost:9876)を知らせます。
#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
注記: RocketMQには、TopicとGroupを自動的に作成する機能があります。メッセージの送受信時に対応するTopicまたはGroupが存在しない場合、RocketMQは自動的にそれらを作成します。そのため、事前にTopicとGroupを作成する必要はありません。
コネクタランタイムのビルド
git clone https://github.com/apache/rocketmq-connect.git
cd rocketmq-connect
export RMQ_CONNECT_HOME=`pwd`
mvn -Prelease-connect -Dmaven.test.skip=true clean install -U
SFTPコネクタプラグインのビルド
cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-sftp/
mvn clean package -Dmaven.test.skip=true
コンパイルされたSFTP RocketMQコネクタのjarファイルを、ランタイムロード用のプラグインディレクトリに配置します。
mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins
cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins
スタンドアロンモードでのコネクタワーカーの実行
connect-standalone.conf
ファイルを修正して、RocketMQ接続アドレスなどの情報を設定します。
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
vim conf/connect-standalone.conf
設定情報の例を以下に示します。
workerId=standalone-worker
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
## Http port for user to access REST API
httpPort=8082
# Rocketmq namesrvAddr
namesrvAddr=localhost:9876
# RocketMQ acl
aclEnable=false
#accessKey=rocketmq
#secretKey=12345678
clusterName="DefaultCluster"
# Plugin path for loading Source/Sink Connectors
pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins
スタンドアロンモードでは、RocketMQ Connectは同期チェックポイント情報を、`storePathRootDir`で指定されたローカルファイルディレクトリに永続的に保存します。
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
同期チェックポイントをリセットする必要がある場合は、永続化されたチェックポイント情報ファイルを削除する必要があります。
rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*
スタンドアロンモードでコネクタワーカーを起動するには
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
SFTPサーバーの設定
SFTP(SSH File Transfer Protocol)は、コンピュータ間での安全なファイル転送に使用されるファイル転送プロトコルです。SFTPはSSH(Secure Shell)プロトコル上に構築されており、暗号化と認証を利用しています。
macOSの組み込みSFTPサービス(「リモートログイン」アクセスを有効にすることで)を使用します。詳細な手順については、Macへのリモートコンピュータからのアクセスを許可するドキュメントを参照してください。
ソーステストファイルの作成
source.txt
という名前のテストファイルを作成し、いくつかのテストデータを書き込みます。
mkdir -p /Users/YourUsername/rocketmqconnect/sftp-test/
cd /Users/YourUsername/rocketmqconnect/sftp-test/
touch source.txt
echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt
SFTPサービスにログインして、正常にアクセスできることを確認します。次のコマンドを入力し、パスワードを入力します。
# sftp -P port YourUsername@hostname
sftp -P 22 YourUsername@127.0.0.1
注記: これはローカルMAC OSが提供するSFTPサービスであるため、アドレスは127.0.0.1
、ポートはデフォルトの22です。
sftp> cd /Users/YourUsername/rocketmqconnect/sftp-test/
sftp> ls source.txt
sftp> bye
コネクタの起動
SFTPソースコネクタの起動
次のコマンドを実行してSFTPソースコネクタを起動します。このコネクタはSFTPサービスに接続してsource.txt
ファイルから読み取ります。ファイル内の各テキスト行に対して、コネクタは内容を解析して一般的なConnectRecordオブジェクトにパッケージ化し、それをRocketMQトピックに送信してシンクコネクタが消費できるようにします。
curl -X POST --location "http://localhost:8082/connectors/SftpSourceConnector" --http1.1 \
-H "Host: localhost:8082" \
-H "Content-Type: application/json" \
-d '{
"connector.class": "org.apache.rocketmq.connect.http.sink.SftpSourceConnector",
"host": "127.0.0.1",
"port": 22,
"username": "YourUsername",
"password": "yourPassword",
"filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/source.txt",
"connect.topicname": "sftpTopic",
"fieldSeparator": "|",
"fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
}'
curlリクエストがステータス: 200を返した場合、コネクタが正常に作成されたことを示します。レスポンスの例を以下に示します。
{"status":200,"body":{"connector.class":"...
ファイルソースコネクタが正常に起動したことを確認するには、次のコマンドを実行します。
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
コネクタSftpSourceConnectorの起動とターゲット状態STARTEDに設定が成功しました!!
SFTPシンクコネクタの起動
次のコマンドを実行してSFTPシンクコネクタを起動します。このコネクタはRocketMQトピックを購読してメッセージを消費し、それぞれを1行のテキストに変換して、SFTPプロトコルを使用して宛先ファイルsink.txt
に書き込みます。
curl -X POST --location "http://localhost:8082/connectors/SftpSinkConnector" --http1.1 \
-H "Host: localhost:8082" \
-H "Content-Type: application/json" \
-d '{
"connector.class": "org.apache.rocketmq.connect.http.sink.SftpSinkConnector",
"host": "127.0.0.1",
"port": 22,
"username": "YourUsername",
"password": "yourPassword",
"filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/sink.txt",
"connect.topicnames": "sftpTopic",
"fieldSeparator": "|",
"fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
}'
curlリクエストがステータス: 200を返した場合、コネクタが正常に作成されたことを示します。レスポンスの例を以下に示します。
{"status":200,"body":{"connector.class":"...
ログを確認して、SFTPシンクコネクタの正常な起動を確認します。
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
コネクタSftpSinkConnectorの起動とターゲット状態STARTEDに設定が成功しました!!
次のコマンドを実行して、データが宛先ファイルに書き込まれたことを確認します。
cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt
sink.txt
ファイルが生成され、その内容がsource.txt
ファイルの内容と一致する場合、プロセス全体が正常に動作しています。
source.txt
ファイルにさらにテストデータを追加して、テストを続行します。
cd /Users/YourUsername/rocketmqconnect/sftp-test/
echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt
# Wait a few seconds to give the connector time to replicate data to the sink file.
sleep 10
cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt
注記: rocketmq-connect-sftp
はRocketMQトピックとの間でメッセージの送受信に通常のメッセージ
を使用するため、ファイルの内容の順序は異なる場合があります。これは順序付きメッセージ
とは異なり、通常のメッセージ
の消費では順序は保証されません。